-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(file-based): changes for not mirroring paths #205
feat(file-based): changes for not mirroring paths #205
Conversation
📝 WalkthroughWalkthroughThe pull request introduces a new boolean field, Changes
Suggested labels
Suggested reviewers
Wdyt about these suggestions? Are there any other reviewers you’d like to consider? Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (6)
airbyte_cdk/sources/file_based/exceptions.py (1)
132-149
: Add type annotations to improve type safety, wdyt?The message formatting looks great! Consider adding type annotations:
-def format_duplicate_files_error_message(stream_name: str, duplicated_files_names: List): +def format_duplicate_files_error_message(stream_name: str, duplicated_files_names: List[Dict[str, List[str]]]) -> str:🧰 Tools
🪛 GitHub Actions: Linters
[error] 132-132: Function is missing a return type annotation
[error] 132-132: Missing type parameters for generic type "List"
airbyte_cdk/sources/file_based/file_based_stream_reader.py (1)
138-146
: Consider improving null safety in the config check, wdyt?The logic looks good, but we could make the null checks more explicit:
def preserve_subdirectories_directories(self) -> bool: # fall back to preserve subdirectories if config is not present or incomplete if ( self.config and hasattr(self.config, "delivery_options") - and hasattr(self.config.delivery_options, "preserve_subdirectories_directories") + and self.config.delivery_options is not None + and hasattr(self.config.delivery_options, "preserve_subdirectories_directories") ): return self.config.delivery_options.preserve_subdirectories_directories return True🧰 Tools
🪛 GitHub Actions: Linters
[error] 145-145: Item "None" of "DeliveryOptions | None" has no attribute "preserve_subdirectories_directories"
airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py (1)
17-22
: Consider enhancing the field description for clarity, wdyt?The implementation looks good! Consider making the description more descriptive:
preserve_subdirectories_directories: bool = Field( True, - description="Flag indicating we should preserve subdirectories directories", + description="When enabled, preserves the subdirectory structure of files during transfer. When disabled, all files are stored in the root directory.", )airbyte_cdk/sources/file_based/file_based_source.py (1)
392-399
: Add return type and improve null safety, wdyt?The logic looks good, but we could improve type safety:
@staticmethod - def _preserve_subdirectories_directories(parsed_config: AbstractFileBasedSpec): + def _preserve_subdirectories_directories(parsed_config: AbstractFileBasedSpec) -> bool: # fall back to preserve subdirectories if config is not present or incomplete if hasattr(parsed_config, "delivery_options") and hasattr( - parsed_config.delivery_options, "preserve_subdirectories_directories" + parsed_config.delivery_options, "preserve_subdirectories_directories" + ) and parsed_config.delivery_options is not None: ): return parsed_config.delivery_options.preserve_subdirectories_directories return True🧰 Tools
🪛 GitHub Actions: Linters
[error] 393-393: Function is missing a return type annotation
[error] 398-398: Item "None" of "DeliveryOptions | None" has no attribute "preserve_subdirectories_directories"
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (2)
50-51
: Consider a more concise property name.The property name
preserve_subdirectories_directories
seems redundant with "directories". What do you think about shortening it to justpreserve_subdirectories
? wdyt?- preserve_subdirectories_directories = True + preserve_subdirectories = TrueAlso applies to: 59-59
64-66
: Update initialization if property name changes.If you agree with shortening the property name, we should update the initialization too. wdyt?
- self.preserve_subdirectories_directories = kwargs.pop( - self.PRESERVE_SUBDIRECTORIES_KW, True - ) + self.preserve_subdirectories = kwargs.pop(self.PRESERVE_SUBDIRECTORIES_KW, True)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py
(3 hunks)airbyte_cdk/sources/file_based/exceptions.py
(2 hunks)airbyte_cdk/sources/file_based/file_based_source.py
(6 hunks)airbyte_cdk/sources/file_based/file_based_stream_reader.py
(2 hunks)airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
(3 hunks)unit_tests/sources/file_based/scenarios/csv_scenarios.py
(1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/file_based/exceptions.py
[error] 132-132: Function is missing a return type annotation
[error] 132-132: Missing type parameters for generic type "List"
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
[error] 5-44: Import block is un-sorted or un-formatted. This can be fixed automatically using the --fix
option with ruff.
airbyte_cdk/sources/file_based/file_based_stream_reader.py
[error] 145-145: Item "None" of "DeliveryOptions | None" has no attribute "preserve_subdirectories_directories"
airbyte_cdk/sources/file_based/file_based_source.py
[error] 393-393: Function is missing a return type annotation
[error] 398-398: Item "None" of "DeliveryOptions | None" has no attribute "preserve_subdirectories_directories"
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (5)
airbyte_cdk/sources/file_based/exceptions.py (1)
114-116
: LGTM! Clean and consistent error class implementation.The error class follows the established pattern in the codebase.
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (2)
111-126
: LGTM! Well-structured implementation.The
_duplicated_files_names
method is well-implemented:
- Uses appropriate data structures (set for O(1) lookups, defaultdict for grouping)
- Clear variable names
- Efficient single-pass algorithm
137-145
: LGTM! Good error handling.The duplicate file check in
compute_slices
is well-integrated:
- Only checks when preserve_subdirectories_directories is False
- Provides clear error messages with file details
- Proper error propagation
unit_tests/sources/file_based/scenarios/csv_scenarios.py (2)
520-538
: LGTM! Well-structured delivery options.The delivery options under "Copy Raw Files" are well-defined:
- Clear property structure
- Descriptive title and description
- Appropriate default value
552-570
: Consider consolidating delivery options.I notice we have delivery options defined both under "Copy Raw Files" and at the root level. Is this intentional duplication or should we consolidate them? wdyt?
✅ Verification successful
The delivery options duplication is intentional and correct.
The delivery_options appear in two different contexts:
- At the root level for global configuration (order=8)
- Within the "Copy Raw Files" delivery method for specific file transfer settings (order=2)
This allows users to configure delivery options both globally and specifically for file transfers when needed.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check if both delivery options are used in the codebase rg -A 5 "delivery_options" | grep -v "test"Length of output: 2562
Script:
#!/bin/bash # Search for class definitions containing delivery_options ast-grep --pattern 'class $_ { $$$ delivery_options: $_ $$$ }'Length of output: 70
Script:
#!/bin/bash # Search for class definitions and their context rg -B 2 -A 10 "class.*DeliveryOptions"Length of output: 1522
Script:
#!/bin/bash # Search for the complete schema structure rg -B 10 -A 20 "class.*FileBasedSpec"Length of output: 17481
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
Outdated
Show resolved
Hide resolved
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (3)
59-66
: Consider simplifying the property name.The property name
preserve_subdirectories_directories
seems to have a redundant "directories" suffix. Wouldpreserve_subdirectories
be clearer and more concise? The initialization logic looks good though! Wdyt?- preserve_subdirectories_directories = True + preserve_subdirectories = True def __init__(self, **kwargs: Any): if self.FILE_TRANSFER_KW in kwargs: self.use_file_transfer = kwargs.pop(self.FILE_TRANSFER_KW, False) - self.preserve_subdirectories_directories = kwargs.pop( + self.preserve_subdirectories = kwargs.pop( self.PRESERVE_SUBDIRECTORIES_KW, True )
111-126
: Add type annotations and consider renaming the method.The duplicate detection logic looks solid! A few suggestions to make it even better:
- The method name could be more Pythonic. Maybe
_get_duplicated_filenames
?- Let's add proper type annotations to fix the pipeline failures.
- def _duplicated_files_names(self, slices: List) -> list[dict]: + def _get_duplicated_filenames( + self, + slices: List[Dict[str, List[RemoteFile]]] + ) -> List[Dict[str, List[str]]]:🧰 Tools
🪛 GitHub Actions: Linters
[error] 111-111: Missing type parameters for generic type "List"
[error] 111-111: Missing type parameters for generic type "dict"
133-145
: Consider extracting the duplicate check into a guard clause.The logic looks correct, but we could make it more readable by extracting the duplicate check into a guard clause at the beginning of the method. Wdyt?
def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]: # Sort files by last_modified, uri and return them grouped by last_modified all_files = self.list_files() files_to_read = self._cursor.get_files_to_sync(all_files, self.logger) sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri)) slices = [ {self.FILES_KEY: list(group[1])} for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified) ] - if slices and not self.preserve_subdirectories_directories: - duplicated_files_names = self._duplicated_files_names(slices) - if duplicated_files_names: - raise DuplicatedFilesError( - format_duplicate_files_error_message( - stream_name=self.name, duplicated_files_names=duplicated_files_names - ), - stream=self.name, - ) + if slices and not self.preserve_subdirectories_directories: + self._check_for_duplicates(slices) return slices + def _check_for_duplicates(self, slices: List[Dict[str, List[RemoteFile]]]) -> None: + duplicated_files_names = self._get_duplicated_filenames(slices) + if duplicated_files_names: + raise DuplicatedFilesError( + format_duplicate_files_error_message( + stream_name=self.name, + duplicated_files_names=duplicated_files_names + ), + stream=self.name, + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
(3 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
[error] 111-111: Missing type parameters for generic type "List"
[error] 111-111: Missing type parameters for generic type "dict"
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (1)
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (1)
8-25
: LGTM! The new imports and constants are well-organized.The new constants
PRESERVE_SUBDIRECTORIES_KW
andFILES_KEY
follow the existing naming convention and are descriptive. The imports are logically grouped, though we could consider organizing them alphabetically within their groups. Wdyt?Also applies to: 50-51
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/file_based/file_based_source.py (1)
392-401
: Well-structured configuration handling.The method follows good practices:
- Safely handles missing or incomplete configuration
- Preserves backward compatibility by defaulting to
True
- Clear and descriptive attribute checks
What do you think about adding a docstring to explain the default behavior? wdyt?
@staticmethod def _preserve_subdirectories_directories(parsed_config: AbstractFileBasedSpec) -> bool: + """ + Determine whether to preserve subdirectories based on the configuration. + + Returns True if: + - The configuration is missing + - The delivery_options is not set + - The preserve_subdirectories_directories flag is not set + """ # fall back to preserve subdirectories if config is not present or incompleteairbyte_cdk/sources/file_based/stream/default_file_based_stream.py (2)
111-128
: Well-implemented duplicate detection.The implementation is efficient using sets and defaultdict. A few suggestions to make it even better:
- Would you consider adding type hints for the return value's inner types? wdyt?
- How about adding a docstring to explain the method's purpose and return format?
def _duplicated_files_names( self, slices: List[dict[str, List[RemoteFile]]] -) -> List[dict[str, List[str]]]: +) -> List[dict[str, list[str]]]: + """ + Identify duplicate file names across all slices. + + Args: + slices: List of slices containing RemoteFile objects + + Returns: + List of dictionaries mapping duplicate file names to their full paths + """
139-147
: Clean integration of duplicate detection.The duplicate check is well-integrated into the existing slice computation logic. However, consider extracting the duplicate check into a separate method for better readability. wdyt?
def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]: + def _check_for_duplicates(slices: List[dict[str, List[RemoteFile]]]) -> None: + if not self.preserve_subdirectories_directories: + duplicated_files_names = self._duplicated_files_names(slices) + if duplicated_files_names: + raise DuplicatedFilesError( + format_duplicate_files_error_message( + stream_name=self.name, duplicated_files_names=duplicated_files_names + ), + stream=self.name, + ) + all_files = self.list_files() files_to_read = self._cursor.get_files_to_sync(all_files, self.logger) sorted_files_to_read = sorted(files_to_read, key=lambda f: (f.last_modified, f.uri)) slices = [ {self.FILES_KEY: list(group[1])} for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified) ] - if slices and not self.preserve_subdirectories_directories: - duplicated_files_names = self._duplicated_files_names(slices) - if duplicated_files_names: - raise DuplicatedFilesError( - format_duplicate_files_error_message( - stream_name=self.name, duplicated_files_names=duplicated_files_names - ), - stream=self.name, - ) + if slices: + _check_for_duplicates(slices) return slices
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/file_based/exceptions.py
(2 hunks)airbyte_cdk/sources/file_based/file_based_source.py
(6 hunks)airbyte_cdk/sources/file_based/file_based_stream_reader.py
(2 hunks)airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/file_based/file_based_stream_reader.py
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (3)
airbyte_cdk/sources/file_based/exceptions.py (2)
114-115
: LGTM! Clean exception class implementation.The
DuplicatedFilesError
follows the established pattern of extendingBaseFileBasedSourceError
.
132-151
: Great error message formatting!The error message is clear, actionable and includes:
- The specific stream where duplicates were found
- The number of duplicates for each file
- Clear guidance on how to resolve the issue
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (1)
50-51
: Good use of constants!Using constants instead of magic strings improves maintainability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
airbyte_cdk/sources/file_based/file_based_stream_reader.py (1)
Line range hint
172-182
: The logic looks good! Would you consider some readability improvements?The implementation correctly uses the new configuration option, but we could make it even clearer with:
- More descriptive variable names (e.g.,
relative_file_path
instead offile_relative_path
)- A more detailed comment explaining the path transformation logic
What do you think about this diff?
- # Remove left slashes from source path format to make relative path for writing locally - file_relative_path = file.uri.lstrip("/") + # Transform the source file path into a relative path for local writing: + # - If preserving directories: maintain the path structure but remove leading slashes + # - If not preserving: use only the filename + relative_file_path = file.uri.lstrip("/") if preserve_subdirectories_directories else path.basename(file.uri)airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py (2)
34-38
: The field definition looks good! Should we enhance the description?The implementation is solid, but the description could be more informative about what this setting actually does. What do you think about making it clearer for users?
preserve_subdirectories_directories: bool = Field( title="Preserve Subdirectories Directories", - description="Flag indicating we should preserve subdirectories directories", + description="When enabled, maintains the original directory structure of files when copying them to the destination. When disabled, all files are copied to a flat structure in the destination directory.", default=True, )
74-78
: Should we use the same enhanced description here for consistency?preserve_subdirectories_directories: bool = Field( title="Preserve Subdirectories Directories", - description="Flag indicating we should preserve subdirectories directories", + description="When enabled, maintains the original directory structure of files when copying them to the destination. When disabled, all files are copied to a flat structure in the destination directory.", default=True, )unit_tests/sources/file_based/scenarios/csv_scenarios.py (1)
526-544
: The test configuration looks good! Should we add more test coverage?The implementation correctly includes the new field in the test scenarios. However, we might want to add test cases that specifically verify the behavior when:
preserve_subdirectories_directories
is set toFalse
- Files are in nested subdirectories
- Edge cases like empty directories or files with identical names in different subdirectories
Would you like me to help draft these additional test scenarios?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py
(2 hunks)airbyte_cdk/sources/file_based/file_based_source.py
(6 hunks)airbyte_cdk/sources/file_based/file_based_stream_reader.py
(2 hunks)unit_tests/sources/file_based/scenarios/csv_scenarios.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/file_based/file_based_source.py
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Analyze (python)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte_cdk/sources/file_based/file_based_stream_reader.py (1)
138-146
: LGTM! The implementation looks solid.The method handles all edge cases gracefully and maintains backward compatibility with the default True value.
… (when raw file is selected)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/file_based/file_based_source.py (1)
393-393
: What do you think about simplifying the method name? 🤔The current name
_preserve_subdirectories_directories
seems a bit redundant with "directories" appearing twice. Maybe we could simplify it to_preserve_subdirectories
? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py
(1 hunks)airbyte_cdk/sources/file_based/file_based_source.py
(6 hunks)airbyte_cdk/sources/file_based/file_based_stream_reader.py
(2 hunks)unit_tests/sources/file_based/scenarios/csv_scenarios.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py
- unit_tests/sources/file_based/scenarios/csv_scenarios.py
- airbyte_cdk/sources/file_based/file_based_stream_reader.py
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: Publish SDM to DockerHub
- GitHub Check: Publish CDK version to PyPI
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/file_based/file_based_source.py (3)
245-245
: Nice refactoring of the_make_default_stream
method! 👍The change to pass
parsed_config
instead ofuse_file_transfer
is a good improvement. It consolidates configuration handling and makes the code more maintainable for future extensions.Also applies to: 276-276, 288-288, 301-301, 313-316
392-401
: Great defensive programming! 💪The implementation safely handles all edge cases and provides a sensible default. I particularly like how it:
- Validates file transfer usage first
- Checks attribute existence
- Has null-safety checks
- Provides a safe default
392-401
: Shall we verify the configuration usage? 🔍Let's check if this new configuration option is properly documented and consistently used across the codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
unit_tests/sources/file_based/test_file_based_stream_reader.py (2)
Line range hint
111-128
: LGTM! Consider a small optimization for memory efficiencyThe implementation looks clean and efficient. Would you consider using a generator expression instead of a list comprehension for the return value to optimize memory usage when dealing with large numbers of duplicates? wdyt?
- return [ + return ( {duplicated_file: file_paths[duplicated_file]} for duplicated_file in duplicates_file_names - ] + )
371-440
: LGTM! Consider adding edge casesThe test coverage looks good! Would you consider adding a few edge cases to make it even more robust? For example:
- Empty directory paths
- Paths with special characters
- Very long paths
- Paths with spaces
This would help ensure the feature handles all scenarios gracefully. wdyt?
airbyte_cdk/sources/file_based/file_based_source.py (1)
392-401
: LGTM! Consider enhancing the commentThe implementation looks solid! Would you consider adding a more descriptive docstring to explain:
- The purpose of the method
- The expected config structure
- The default behavior and why it defaults to True
- Example usage
Something like:
@staticmethod def _preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool: """ Determines whether to preserve directory structure during file transfer. When enabled, files maintain their subdirectory paths in the destination. When disabled, files are flattened to the root of the destination. Args: parsed_config: The parsed configuration containing delivery method settings Returns: True if directory structure should be preserved (default), False otherwise """wdyt?
🧰 Tools
🪛 GitHub Actions: Linters
[warning] File requires formatting. Run 'ruff format' to fix code formatting issues.
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (1)
111-128
: Consider using a more memory-efficient approach for large file sets, wdyt?The current implementation stores all file paths in memory. For very large file sets, this could be memory intensive. Consider using a generator-based approach or processing files in chunks if memory usage becomes a concern.
Here's a potential optimization:
def _duplicated_files_names( self, slices: List[dict[str, List[RemoteFile]]] ) -> List[dict[str, List[str]]]: - seen_file_names = set() - duplicates_file_names = set() - file_paths = defaultdict(list) + seen_file_names: Dict[str, List[str]] = defaultdict(list) for file_slice in slices: for file_found in file_slice[self.FILES_KEY]: file_name = path.basename(file_found.uri) - if file_name not in seen_file_names: - seen_file_names.add(file_name) - else: - duplicates_file_names.add(file_name) - file_paths[file_name].append(file_found.uri) + seen_file_names[file_name].append(file_found.uri) return [ - {duplicated_file: file_paths[duplicated_file]} - for duplicated_file in duplicates_file_names + {file_name: paths} + for file_name, paths in seen_file_names.items() + if len(paths) > 1 ]unit_tests/sources/file_based/stream/test_default_file_based_stream.py (1)
370-469
: Comprehensive test coverage for the new functionality!The test class covers both successful and error scenarios with realistic test data. The error message validation is thorough.
Consider adding a test case for empty slices, wdyt?
def test_when_compute_slices_with_empty_slices(self) -> None: with ( mock.patch.object(DefaultFileBasedStream, "list_files", return_value=[]), mock.patch.object(self._stream._cursor, "get_files_to_sync", return_value=[]), ): returned_slices = self._stream.compute_slices() assert returned_slices == []
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py
(1 hunks)airbyte_cdk/sources/file_based/file_based_source.py
(6 hunks)airbyte_cdk/sources/file_based/file_based_stream_reader.py
(2 hunks)airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
(3 hunks)unit_tests/sources/file_based/scenarios/csv_scenarios.py
(1 hunks)unit_tests/sources/file_based/stream/test_default_file_based_stream.py
(4 hunks)unit_tests/sources/file_based/test_file_based_stream_reader.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py
- airbyte_cdk/sources/file_based/file_based_stream_reader.py
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/file_based/file_based_source.py
[warning] File requires formatting. Run 'ruff format' to fix code formatting issues.
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (5)
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (2)
49-50
: LGTM! The new constants and property are well-defined.The constants follow the existing naming pattern and the property has a sensible default value.
Also applies to: 58-58
139-144
: The duplicate file validation logic looks good!The validation is correctly applied only when
preserve_directory_structure
is False, and the error message provides clear context about the duplicates.unit_tests/sources/file_based/stream/test_default_file_based_stream.py (2)
310-322
: LGTM! Good test setup.The test setup properly initializes the stream with
preserve_directory_structure=False
.
342-368
: The test data looks good!The test uses realistic file paths and properly validates the computed slices.
unit_tests/sources/file_based/scenarios/csv_scenarios.py (1)
527-532
: LGTM! The configuration option is well-defined.The
preserve_directory_structure
option:
- Has a clear description
- Uses a sensible default value
- Is properly integrated into the delivery method schema
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
airbyte_cdk/sources/file_based/file_based_source.py (2)
301-301
: Consider adding return type hints to helper methods?The helper methods
_use_file_transfer
and_preserve_directory_structure
could benefit from explicit return type hints, wdyt? This would make the API contract clearer:@staticmethod - def _use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool: + def _use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool:Also applies to: 313-314
390-399
: Want to make this more concise using the walrus operator?The implementation could be simplified while maintaining the same logic, wdyt?
@staticmethod def _preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool: - # fall back to preserve subdirectories if config is not present or incomplete - if ( - FileBasedSource._use_file_transfer(parsed_config) - and hasattr(parsed_config.delivery_method, "preserve_directory_structure") - and parsed_config.delivery_method.preserve_directory_structure is not None - ): - return parsed_config.delivery_method.preserve_directory_structure - return True + # fall back to preserve subdirectories if config is not present or incomplete + return (preserve := getattr(parsed_config.delivery_method, "preserve_directory_structure", None)) is not None \ + and FileBasedSource._use_file_transfer(parsed_config) \ + and preserve
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/file_based/file_based_source.py
(6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/file_based/file_based_source.py (1)
245-245
: LGTM! Nice encapsulation improvement.Moving the
use_file_transfer
logic inside_make_default_stream
makes the code more maintainable by centralizing the configuration handling.Also applies to: 276-276, 288-288
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
airbyte_cdk/sources/file_based/file_based_source.py (1)
Test files need parameter updates for DefaultFileBasedStream constructor
Some test instances are missing the new parameters. Would you mind updating these?
test_default_file_based_stream.py:100
: Missing bothuse_file_transfer
andpreserve_directory_structure
test_default_file_based_stream.py:297
: Missingpreserve_directory_structure
test_adapters.py:225
: Missing both parametersThe production code looks good with both parameters properly handled! 🎉
🔗 Analysis chain
Line range hint
301-314
: The parameter changes look good! Quick verification needed.The changes to
_make_default_stream
look well-structured. The method now computesuse_file_transfer
internally and adds support forpreserve_directory_structure
. Would you mind verifying that all callers ofDefaultFileBasedStream
constructor are updated to handle these new parameters?🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find all instantiations of DefaultFileBasedStream to verify constructor parameter updates ast-grep --pattern 'DefaultFileBasedStream($$$)'Length of output: 8901
🧹 Nitpick comments (2)
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (1)
111-121
: Consider using a more descriptive return type hint, wdyt?The method implementation is clean and efficient. However, the return type could be more descriptive to better document the structure:
def _duplicated_files_names( - self, slices: List[dict[str, List[RemoteFile]]] - ) -> List[dict[str, List[str]]]: + self, slices: List[dict[str, List[RemoteFile]]] + ) -> List[dict[str, List[str]]]: # Dict maps duplicate filename to list of full pathsairbyte_cdk/sources/file_based/file_based_source.py (1)
390-410
: Great implementation! Consider a more concise version?The method is well-documented and handles edge cases correctly. What do you think about making it more concise like this?
@staticmethod def _preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool: """ Determines whether to preserve directory structure during file transfer. When enabled, files maintain their subdirectory paths in the destination. When disabled, files are flattened to the root of the destination. Args: parsed_config: The parsed configuration containing delivery method settings Returns: True if directory structure should be preserved (default), False otherwise """ - if ( - FileBasedSource._use_file_transfer(parsed_config) - and hasattr(parsed_config.delivery_method, "preserve_directory_structure") - and parsed_config.delivery_method.preserve_directory_structure is not None - ): - return parsed_config.delivery_method.preserve_directory_structure - return True + return (not FileBasedSource._use_file_transfer(parsed_config) or + not hasattr(parsed_config.delivery_method, "preserve_directory_structure") or + parsed_config.delivery_method.preserve_directory_structure is None or + parsed_config.delivery_method.preserve_directory_structure)The suggested version:
- Uses boolean logic to combine conditions
- Maintains the same behavior
- Reduces nesting
- Makes the default
True
case more explicitWhat do you think? 🤔
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/file_based/file_based_source.py
(6 hunks)airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (3)
airbyte_cdk/sources/file_based/stream/default_file_based_stream.py (2)
49-50
: LGTM! Clear constant naming.The new constants follow the existing naming pattern and make the code more maintainable.
132-137
: LGTM! Clean duplicate detection implementation.The duplicate detection is correctly implemented and only runs when directory structure is not preserved. The error handling using
DuplicatedFilesError
follows the established pattern.airbyte_cdk/sources/file_based/file_based_source.py (1)
245-245
: LGTM! Consistent parameter updates across all call sites.The changes to pass
parsed_config
instead ofuse_file_transfer
are applied consistently across all three call sites.Also applies to: 276-276, 288-288
…equired for google-drive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all of these updates. Approved! ✅
…s in writable folder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py (2)
52-58
: Consider improving error handling.The error handling could be more specific. Instead of catching all
OSError
s, would it make sense to catch specific exceptions likePermissionError
and provide more detailed error messages? For example:try: nltk_data_dir = "/airbyte/nltk_data" os.makedirs(nltk_data_dir, exist_ok=True) - except OSError: + except PermissionError: nltk_data_dir = "/tmp/nltk_data" - os.makedirs(nltk_data_dir, exist_ok=True) + try: + os.makedirs(nltk_data_dir, exist_ok=True) + except PermissionError as e: + raise RuntimeError(f"Unable to create NLTK data directory: {e}")
62-70
: Consider consolidating NLTK downloads.The current implementation downloads tokenizers individually. Would it be cleaner to consolidate these into a list? For example:
- nltk.download("punkt", download_dir=nltk_data_dir) - nltk.download("punkt_tab", download_dir=nltk_data_dir) - nltk.download("averaged_perceptron_tagger_eng", download_dir=nltk_data_dir) + tokenizers = ["punkt", "punkt_tab", "averaged_perceptron_tagger_eng"] + for tokenizer in tokenizers: + nltk.download(tokenizer, download_dir=nltk_data_dir)This would make it easier to maintain the list of required tokenizers. wdyt?
🧰 Tools
🪛 GitHub Actions: Linters
[error] 62-62: Call to untyped function "get_ntlk_temp_folder" in typed context
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py
(2 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py
[error] 47-47: Function is missing a return type annotation
[error] 62-62: Call to untyped function "get_ntlk_temp_folder" in typed context
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py (1)
5-5
: LGTM!The
os
module import is appropriately placed and necessary for the directory operations.
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py (1)
62-70
: Add logging for better observability.Would it help to add logging to track download progress and failures? Here's a suggestion:
try: + logger = logging.getLogger(__name__) + logger.info(f"Initializing NLTK data directory at {nltk_data_dir}") nltk.data.path.append(nltk_data_dir) nltk.data.find("tokenizers/punkt.zip") nltk.data.find("tokenizers/punkt_tab.zip") nltk.data.find("tokenizers/averaged_perceptron_tagger_eng.zip") except LookupError: + logger.info("Downloading required NLTK tokenizers...") nltk.download("punkt", download_dir=nltk_data_dir) nltk.download("punkt_tab", download_dir=nltk_data_dir) nltk.download("averaged_perceptron_tagger_eng", download_dir=nltk_data_dir) + logger.info("Successfully downloaded all required NLTK tokenizers")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py (2)
5-5
: LGTM!The
os
import is correctly placed with other standard library imports.
47-59
: 🛠️ Refactor suggestionFix typo in function name and enhance error handling.
The function has a typo in its name and could benefit from improved error handling. What do you think about these changes?
-def get_ntlk_temp_folder() -> str: +def get_nltk_temp_folder() -> str: """ For non-root connectors /tmp is not currently writable, but we should allow it in the future. It's safe to use /airbyte for now. Fallback to /tmp for local development. + + Returns: + str: Path to the NLTK data directory that was successfully created. + + Raises: + OSError: If neither primary nor fallback directories could be created. """ try: nltk_data_dir = "/airbyte/nltk_data" os.makedirs(nltk_data_dir, exist_ok=True) except OSError: nltk_data_dir = "/tmp/nltk_data" os.makedirs(nltk_data_dir, exist_ok=True) + if not os.access(nltk_data_dir, os.W_OK): + raise OSError(f"Neither /airbyte/nltk_data nor {nltk_data_dir} are writable") return nltk_data_dirLikely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py (2)
47-59
: Fix typo in function name and enhance error handling.The implementation looks good, but I noticed a few things we could improve. What do you think about:
- Fixing the typo in the function name (
ntlk
→nltk
)?- Adding an error message in the except block to help with debugging?
- Using constants for the paths?
+AIRBYTE_NLTK_DATA_DIR = "/airbyte/nltk_data" +TMP_NLTK_DATA_DIR = "/tmp/nltk_data" -def get_ntlk_temp_folder() -> str: +def get_nltk_temp_folder() -> str: """ For non-root connectors /tmp is not currently writable, but we should allow it in the future. It's safe to use /airbyte for now. Fallback to /tmp for local development. """ try: - nltk_data_dir = "/airbyte/nltk_data" + nltk_data_dir = AIRBYTE_NLTK_DATA_DIR os.makedirs(nltk_data_dir, exist_ok=True) - except OSError: - nltk_data_dir = "/tmp/nltk_data" + except OSError as e: + logger.debug(f"Failed to create {AIRBYTE_NLTK_DATA_DIR}: {e}. Falling back to {TMP_NLTK_DATA_DIR}") + nltk_data_dir = TMP_NLTK_DATA_DIR os.makedirs(nltk_data_dir, exist_ok=True) return nltk_data_dir
62-70
: Consider more granular error handling and logging.The NLTK data initialization looks good, but what do you think about making it more robust and informative? Here's a suggestion:
try: nltk_data_dir = get_nltk_temp_folder() nltk.data.path.append(nltk_data_dir) + # Try loading each tokenizer separately for better error handling nltk.data.find("tokenizers/punkt.zip") +except LookupError: + logger.info("Downloading punkt tokenizer...") + nltk.download("punkt", download_dir=nltk_data_dir, quiet=True) + +try: nltk.data.find("tokenizers/punkt_tab.zip") +except LookupError: + logger.info("Downloading punkt_tab tokenizer...") + nltk.download("punkt_tab", download_dir=nltk_data_dir, quiet=True) + +try: nltk.data.find("tokenizers/averaged_perceptron_tagger_eng.zip") -except LookupError: - nltk.download("punkt", download_dir=nltk_data_dir, quiet=True) - nltk.download("punkt_tab", download_dir=nltk_data_dir, quiet=True) - nltk.download("averaged_perceptron_tagger_eng", download_dir=nltk_data_dir, quiet=True) +except LookupError: + logger.info("Downloading averaged_perceptron_tagger_eng tokenizer...") + nltk.download("averaged_perceptron_tagger_eng", download_dir=nltk_data_dir, quiet=True)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py (1)
5-5
: LGTM! Clean import addition.The
os
import is appropriately placed and necessary for the new directory operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py (1)
46-47
: Consider using environment variables for NLTK data paths?The paths are currently hardcoded. For better flexibility across different environments, what do you think about using environment variables with fallbacks? Something like:
-AIRBYTE_NLTK_DATA_DIR = "/airbyte/nltk_data" -TMP_NLTK_DATA_DIR = "/tmp/nltk_data" +AIRBYTE_NLTK_DATA_DIR = os.getenv("AIRBYTE_NLTK_DATA_DIR", "/airbyte/nltk_data") +TMP_NLTK_DATA_DIR = os.getenv("TMP_NLTK_DATA_DIR", "/tmp/nltk_data")This would make it easier to configure paths in different deployment scenarios, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/file_based/file_types/unstructured_parser.py (2)
50-62
: Well-structured implementation!The function is well-documented, handles errors gracefully, and provides a clear fallback mechanism. Good job on the type hints and explanatory docstring!
65-73
: Verify impact of additional tokenizer downloadI notice you've added the
averaged_perceptron_tagger_eng
tokenizer. This is a good addition for enhanced text processing, but it might affect initialization time and disk space usage.Let's check the size impact:
✅ Verification successful
NLTK tokenizer change looks good! 👍
I see the averaged_perceptron_tagger is already used in the test suite, so this change just formalizes its download in the main parser. The eng variant seems more specific, and the quiet flag keeps things tidy.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check size of NLTK data before and after downloading the new tokenizer nltk_dir="/tmp/nltk_test" mkdir -p "$nltk_dir" # Download original tokenizers python3 -c "import nltk; nltk.download('punkt', download_dir='$nltk_dir'); nltk.download('punkt_tab', download_dir='$nltk_dir')" du -sh "$nltk_dir" # Add new tokenizer python3 -c "import nltk; nltk.download('averaged_perceptron_tagger_eng', download_dir='$nltk_dir')" du -sh "$nltk_dir"Length of output: 614
Script:
#!/bin/bash # Search for NLTK tagger usage and related patterns rg -i "averaged_perceptron_tagger|nltk.*tagger" -A 3 # Look for any NLTK downloads in the codebase rg "nltk\.download" -A 2 # Check if this file had previous versions with different NLTK setup git log -p -- airbyte_cdk/sources/file_based/file_types/unstructured_parser.py | rg -A 3 -B 3 "nltk"Length of output: 2236
WHAT
The Source config receives a new option: Preserve Sub-Directories in File Paths. By default this is enabled (the current behavior).
When enabled, the sync will:
Here is a loom explaining functionality.
If Copy Raw files is selected (the default is True but image show when is turned off):
If replicate records is selected:
Summary by CodeRabbit
Summary by CodeRabbit
New Features
Improvements